@@ -74,7 +74,7 @@ module Agents |
||
| 74 | 74 |
|
| 75 | 75 |
The `headers` field is optional. When present, it should be a hash of headers to send with the request. |
| 76 | 76 |
|
| 77 |
- The WebsiteAgent can also scrape based on incoming events. It will scrape the url contained in the `url` key of the incoming event payload. |
|
| 77 |
+ The WebsiteAgent can also scrape based on incoming events. It will scrape the url contained in the `url` key of the incoming event payload. If you specify `merge` as the mode, it will retain the old payload and update it with the new values. |
|
| 78 | 78 |
|
| 79 | 79 |
In Liquid templating, the following variable is available: |
| 80 | 80 |
|
@@ -120,7 +120,7 @@ module Agents |
||
| 120 | 120 |
|
| 121 | 121 |
# Check for optional fields |
| 122 | 122 |
if options['mode'].present? |
| 123 |
- errors.add(:base, "mode must be set to on_change or all") unless %w[on_change all].include?(options['mode']) |
|
| 123 |
+ errors.add(:base, "mode must be set to on_change, all or merge") unless %w[on_change all merge].include?(options['mode']) |
|
| 124 | 124 |
end |
| 125 | 125 |
|
| 126 | 126 |
if options['expected_update_period_in_days'].present? |
@@ -148,66 +148,70 @@ module Agents |
||
| 148 | 148 |
end |
| 149 | 149 |
|
| 150 | 150 |
def check |
| 151 |
- check_url interpolated['url'] |
|
| 151 |
+ check_urls(interpolated['url']) |
|
| 152 | 152 |
end |
| 153 | 153 |
|
| 154 |
- def check_url(in_url) |
|
| 154 |
+ def check_urls(in_url) |
|
| 155 | 155 |
return unless in_url.present? |
| 156 | 156 |
|
| 157 | 157 |
Array(in_url).each do |url| |
| 158 |
- log "Fetching #{url}"
|
|
| 159 |
- response = faraday.get(url) |
|
| 160 |
- raise "Failed: #{response.inspect}" unless response.success?
|
|
| 161 |
- |
|
| 162 |
- interpolation_context.stack {
|
|
| 163 |
- interpolation_context['_response_'] = ResponseDrop.new(response) |
|
| 164 |
- body = response.body |
|
| 165 |
- if (encoding = interpolated['force_encoding']).present? |
|
| 166 |
- body = body.encode(Encoding::UTF_8, encoding) |
|
| 167 |
- end |
|
| 168 |
- doc = parse(body) |
|
| 158 |
+ check_url(url) |
|
| 159 |
+ end |
|
| 160 |
+ end |
|
| 169 | 161 |
|
| 170 |
- if extract_full_json? |
|
| 171 |
- if store_payload!(previous_payloads(1), doc) |
|
| 172 |
- log "Storing new result for '#{name}': #{doc.inspect}"
|
|
| 173 |
- create_event :payload => doc |
|
| 174 |
- end |
|
| 175 |
- next |
|
| 176 |
- end |
|
| 162 |
+ def check_url(url, payload = {})
|
|
| 163 |
+ log "Fetching #{url}"
|
|
| 164 |
+ response = faraday.get(url) |
|
| 165 |
+ raise "Failed: #{response.inspect}" unless response.success?
|
|
| 177 | 166 |
|
| 178 |
- output = |
|
| 179 |
- case extraction_type |
|
| 180 |
- when 'json' |
|
| 181 |
- extract_json(doc) |
|
| 182 |
- when 'text' |
|
| 183 |
- extract_text(doc) |
|
| 184 |
- else |
|
| 185 |
- extract_xml(doc) |
|
| 186 |
- end |
|
| 167 |
+ interpolation_context.stack {
|
|
| 168 |
+ interpolation_context['_response_'] = ResponseDrop.new(response) |
|
| 169 |
+ body = response.body |
|
| 170 |
+ if (encoding = interpolated['force_encoding']).present? |
|
| 171 |
+ body = body.encode(Encoding::UTF_8, encoding) |
|
| 172 |
+ end |
|
| 173 |
+ doc = parse(body) |
|
| 187 | 174 |
|
| 188 |
- num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq
|
|
| 175 |
+ if extract_full_json? |
|
| 176 |
+ if store_payload!(previous_payloads(1), doc) |
|
| 177 |
+ log "Storing new result for '#{name}': #{doc.inspect}"
|
|
| 178 |
+ create_event payload: payload.merge(doc) |
|
| 179 |
+ end |
|
| 180 |
+ return |
|
| 181 |
+ end |
|
| 189 | 182 |
|
| 190 |
- if num_unique_lengths.length != 1 |
|
| 191 |
- raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}"
|
|
| 183 |
+ output = |
|
| 184 |
+ case extraction_type |
|
| 185 |
+ when 'json' |
|
| 186 |
+ extract_json(doc) |
|
| 187 |
+ when 'text' |
|
| 188 |
+ extract_text(doc) |
|
| 189 |
+ else |
|
| 190 |
+ extract_xml(doc) |
|
| 192 | 191 |
end |
| 193 | 192 |
|
| 194 |
- old_events = previous_payloads num_unique_lengths.first |
|
| 195 |
- num_unique_lengths.first.times do |index| |
|
| 196 |
- result = {}
|
|
| 197 |
- interpolated['extract'].keys.each do |name| |
|
| 198 |
- result[name] = output[name][index] |
|
| 199 |
- if name.to_s == 'url' |
|
| 200 |
- result[name] = (response.env[:url] + result[name]).to_s |
|
| 201 |
- end |
|
| 202 |
- end |
|
| 193 |
+ num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq
|
|
| 194 |
+ |
|
| 195 |
+ if num_unique_lengths.length != 1 |
|
| 196 |
+ raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}"
|
|
| 197 |
+ end |
|
| 203 | 198 |
|
| 204 |
- if store_payload!(old_events, result) |
|
| 205 |
- log "Storing new parsed result for '#{name}': #{result.inspect}"
|
|
| 206 |
- create_event :payload => result |
|
| 199 |
+ old_events = previous_payloads num_unique_lengths.first |
|
| 200 |
+ num_unique_lengths.first.times do |index| |
|
| 201 |
+ result = {}
|
|
| 202 |
+ interpolated['extract'].keys.each do |name| |
|
| 203 |
+ result[name] = output[name][index] |
|
| 204 |
+ if name.to_s == 'url' |
|
| 205 |
+ result[name] = (response.env[:url] + result[name]).to_s |
|
| 207 | 206 |
end |
| 208 | 207 |
end |
| 209 |
- } |
|
| 210 |
- end |
|
| 208 |
+ |
|
| 209 |
+ if store_payload!(old_events, result) |
|
| 210 |
+ log "Storing new parsed result for '#{name}': #{result.inspect}"
|
|
| 211 |
+ create_event payload: payload.merge(result) |
|
| 212 |
+ end |
|
| 213 |
+ end |
|
| 214 |
+ } |
|
| 211 | 215 |
rescue => e |
| 212 | 216 |
error "Error when fetching url: #{e.message}\n#{e.backtrace.join("\n")}"
|
| 213 | 217 |
end |
@@ -216,7 +220,9 @@ module Agents |
||
| 216 | 220 |
incoming_events.each do |event| |
| 217 | 221 |
interpolate_with(event) do |
| 218 | 222 |
url_to_scrape = event.payload['url'] |
| 219 |
- check_url(url_to_scrape) if url_to_scrape =~ /^https?:\/\//i |
|
| 223 |
+ next unless url_to_scrape =~ /^https?:\/\//i |
|
| 224 |
+ check_url(url_to_scrape, |
|
| 225 |
+ interpolated['mode'].to_s == "merge" ? event.payload : {})
|
|
| 220 | 226 |
end |
| 221 | 227 |
end |
| 222 | 228 |
end |
@@ -238,7 +244,7 @@ module Agents |
||
| 238 | 244 |
end |
| 239 | 245 |
end |
| 240 | 246 |
true |
| 241 |
- when 'all', '' |
|
| 247 |
+ when 'all', 'merge', '' |
|
| 242 | 248 |
true |
| 243 | 249 |
else |
| 244 | 250 |
raise "Illegal options[mode]: #{interpolated['mode']}"
|
@@ -521,6 +521,16 @@ fire: hot |
||
| 521 | 521 |
|
| 522 | 522 |
expect(Event.last.payload['response_info']).to eq('The reponse from XKCD was 200 OK.')
|
| 523 | 523 |
end |
| 524 |
+ |
|
| 525 |
+ it "should support merging of events" do |
|
| 526 |
+ expect {
|
|
| 527 |
+ @checker.options = @valid_options |
|
| 528 |
+ @checker.options[:mode] = "merge" |
|
| 529 |
+ @checker.receive([@event]) |
|
| 530 |
+ }.to change { Event.count }.by(1)
|
|
| 531 |
+ last_payload = Event.last.payload |
|
| 532 |
+ expect(last_payload['link']).to eq('Random')
|
|
| 533 |
+ end |
|
| 524 | 534 |
end |
| 525 | 535 |
end |
| 526 | 536 |
|